package org.gbif.crawler;

import org.gbif.crawler.client.HttpCrawlClient;
import org.gbif.crawler.exception.FatalCrawlException;
import org.gbif.crawler.exception.ProtocolException;
import org.gbif.crawler.exception.TransportException;
import org.gbif.crawler.protocol.biocase.BiocaseCrawlConfiguration;
import org.gbif.crawler.protocol.biocase.BiocaseResponseHandler;
import org.gbif.crawler.protocol.biocase.BiocaseScientificNameRangeRequestHandler;
import org.gbif.crawler.protocol.digir.DigirCrawlConfiguration;
import org.gbif.crawler.protocol.digir.DigirResponseHandler;
import org.gbif.crawler.protocol.digir.DigirScientificNameRangeRequestHandler;
import org.gbif.crawler.protocol.tapir.TapirCrawlConfiguration;
import org.gbif.crawler.protocol.tapir.TapirResponseHandler;
import org.gbif.crawler.protocol.tapir.TapirScientificNameRangeRequestHandler;
import org.gbif.crawler.retry.LimitedRetryPolicy;
import org.gbif.crawler.strategy.ScientificNameRangeCrawlContext;
import org.gbif.crawler.strategy.ScientificNameRangeStrategy;
import org.gbif.wrangler.lock.Lock;
import org.gbif.wrangler.lock.NoLockFactory;

import java.net.URI;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.NotThreadSafe;

import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import org.apache.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkNotNull;

/**
 * This crawler can be used to crawl datasets. It plays an orchestrating role and ties together a lot of other
 * components that have to be supplied using the constructor.
 * <p/>
 * This class is not thread-safe as one cannot rely on all the underlying components being thread safe.
 *
 * @param <CTX> the crawl context to be used, this holds the mutable state of a crawl and usually depends on the
 *        crawl strategy being used
 * @param <REQ> the type of request being created by the request handler and used by the crawl client
 * @param <RESP> the type of the response being returned by the crawl client and consumed by the response handler
 * @param <RES> the type of the result being generated by the response handler
 */
@NotThreadSafe
public class Crawler<CTX extends CrawlContext, REQ, RESP, RES> {

  private static final Logger LOG = LoggerFactory.getLogger(Crawler.class);
  private final CrawlStrategy<CTX> strategy;
  private final RequestHandler<CTX, REQ> requestHandler;
  private final ResponseHandler<RESP, RES> responseHandler;
  private final CrawlClient<REQ, RESP> client;
  private final RetryPolicy retryPolicy;
  private final Lock lock;
  private final Stopwatch stopwatch = new Stopwatch();
  private final List<CrawlListener<CTX, REQ, RES>> listeners = Lists.newArrayList();

  public Crawler(
    CrawlStrategy<CTX> strategy,
    RequestHandler<CTX, REQ> requestHandler,
    ResponseHandler<RESP, RES> responseHandler,
    CrawlClient<REQ, RESP> client,
    RetryPolicy retryPolicy,
    Lock lock) {
    this.strategy = checkNotNull(strategy, "strategy can't be null");
    this.requestHandler = checkNotNull(requestHandler, "requestHandler can't be null");
    this.responseHandler = checkNotNull(responseHandler, "responseHandler can't be null");
    this.client = checkNotNull(client, "client can't be null");
    this.retryPolicy = checkNotNull(retryPolicy, "retryPolicy can't be null");
    this.lock = checkNotNull(lock, "lock can't be null");
  }

  /**
   * Creates a new Crawler for a BioCASe endpoint using default settings.
   * These default settings are:
   * <ul>
   * <li>Scientific name ranges will be used for crawling</li>
   * <li>No lock will be used</li>
   * <li>Connection timeouts of 10 minutes will be used</li>
   * <li>500 connections will be used in total</li>
   * </ul>
   *
   * @param datasetKey of the dataset to crawl, this is required right now but it can be any random UUID if a
   *        non-GBIF dataset should be crawled
   * @param attempt of this crawl
   * @param url to crawl
   * @param contentNamespace a contentNamespace this endpoint supports and that we should use
   * @param datasetTitle to request
   */
  public static Crawler<ScientificNameRangeCrawlContext, String, HttpResponse, List<Byte>> newBiocaseCrawler(
    UUID datasetKey, int attempt, URI url, String contentNamespace, String datasetTitle
    ) {
    BiocaseCrawlConfiguration configuration =
      new BiocaseCrawlConfiguration(datasetKey, attempt, url, contentNamespace, datasetTitle);
    RequestHandler<ScientificNameRangeCrawlContext, String> requestHandler =
      new BiocaseScientificNameRangeRequestHandler(configuration);
    ResponseHandler<HttpResponse, List<Byte>> responseHandler = new BiocaseResponseHandler();
    return newCrawler(requestHandler, responseHandler);
  }

  /**
   * Creates a new Crawler for a DiGIR endpoint using default settings.
   * These default settings are:
   * <ul>
   * <li>Scientific name ranges will be used for crawling</li>
   * <li>No lock will be used</li>
   * <li>Connection timeouts of 10 minutes will be used</li>
   * <li>500 connections will be used in total</li>
   * </ul>
   *
   * @param datasetKey of the dataset to crawl, this is required right now but it can be any random UUID if a
   *        non-GBIF dataset should be crawled
   * @param attempt of this crawl
   * @param url to crawl
   * @param resourceCode of the dataset
   * @param manis true if this is a MANIS endpoint
   */
  public static Crawler<ScientificNameRangeCrawlContext, String, HttpResponse, List<Byte>> newDigirCrawler(
    UUID datasetKey, int attempt, URI url, String resourceCode, boolean manis
    ) {
    DigirCrawlConfiguration configuration = new DigirCrawlConfiguration(datasetKey, attempt, url, resourceCode, manis);
    RequestHandler<ScientificNameRangeCrawlContext, String> requestHandler =
      new DigirScientificNameRangeRequestHandler(configuration);
    ResponseHandler<HttpResponse, List<Byte>> responseHandler = new DigirResponseHandler();
    return newCrawler(requestHandler, responseHandler);
  }

  public static <CTX extends CrawlContext, REQ, RESP, RES> Crawler<CTX, REQ, RESP, RES> newInstance(
    CrawlStrategy<CTX> strategy,
    RequestHandler<CTX, REQ> requestHandler,
    ResponseHandler<RESP, RES> responseHandler,
    CrawlClient<REQ, RESP> client,
    RetryPolicy retryPolicy,
    Lock lock
    ) {
    return new Crawler<CTX, REQ, RESP, RES>(strategy, requestHandler, responseHandler, client, retryPolicy, lock);
  }

  /**
   * Creates a new Crawler for a TAPIR endpoint using default settings.
   * These default settings are:
   * <ul>
   * <li>Scientific name ranges will be used for crawling</li>
   * <li>No lock will be used</li>
   * <li>Connection timeouts of 10 minutes will be used</li>
   * <li>500 connections will be used in total</li>
   * </ul>
   *
   * @param datasetKey of the dataset to crawl, this is required right now but it can be any random UUID if a
   *        non-GBIF dataset should be crawled
   * @param attempt of this crawl
   * @param url to crawl
   * @param contentNamespace a contentNamespace this endpoint supports and that we should use
   */
  public static Crawler<ScientificNameRangeCrawlContext, String, HttpResponse, List<Byte>> newTapirCrawler(
    UUID datasetKey, int attempt, URI url, String contentNamespace
    ) {
    TapirCrawlConfiguration configuration = new TapirCrawlConfiguration(datasetKey, attempt, url, contentNamespace);
    RequestHandler<ScientificNameRangeCrawlContext, String> requestHandler =
      new TapirScientificNameRangeRequestHandler(configuration);
    ResponseHandler<HttpResponse, List<Byte>> responseHandler = new TapirResponseHandler();
    return newCrawler(requestHandler, responseHandler);
  }

  private static Crawler<ScientificNameRangeCrawlContext, String, HttpResponse, List<Byte>> newCrawler(
    RequestHandler<ScientificNameRangeCrawlContext, String> requestHandler,
    ResponseHandler<HttpResponse, List<Byte>> responseHandler
    ) {
    ScientificNameRangeCrawlContext context = new ScientificNameRangeCrawlContext();
    CrawlStrategy<ScientificNameRangeCrawlContext> strategy = new ScientificNameRangeStrategy(context);

    // 10 minute timeout, 500 connections maximum, 20 per route
    CrawlClient<String, HttpResponse> crawlClient = HttpCrawlClient.newInstance(600000, 500, 20);
    RetryPolicy retryPolicy = new LimitedRetryPolicy(10, 2, 10, 2);
    Lock lock = NoLockFactory.getLock();
    return newInstance(strategy, requestHandler, responseHandler, crawlClient, retryPolicy, lock);
  }

  /**
   * Adds a listener to this crawler. The listeners will be called in the order they have been added and will run in the
   * same thread as the crawler.
   *
   * @param listener to add
   */
  public void addListener(CrawlListener<CTX, REQ, RES> listener) {
    listeners.add(listener);
  }

  /**
   * This starts a crawl using the provided strategy.
   * <p/>
   * It will iterate over the contexts the strategy provides. For each context it will also iterate over all the pages.
   * It uses the RequestHandler to build a request for each context which will then be retrieved by the CrawlClient and
   * the response will be handled by the ResponseHandler.
   */
  public void crawl() {

    // Initialization
    notifyStart();
    CTX currentContext = null;

    /*
     * Main loop, will run as long as we didn't get a user abort, there are more things to crawl
     * and retries have not been exhausted.
     * It's using two loops, one to loop over each of the Contexts returned by the CrawlStrategy which will depend
     * on the implementation and an inner loop that loops over pages for each context. That does not depend on the
     * strategy because there could be multiple pages for any context, this is dependent on the response handler.
     */
    try {
      while ((currentContext == null || !currentContext.isAborted())
        && !retryPolicy.abortCrawl()
        && strategy.hasNext()) {
        currentContext = strategy.next();

        // The inner loop
        boolean hasNextPage;
        do {
          notifyProgress(currentContext);

          REQ req = requestHandler.buildRequestUrl(currentContext);

          executeRequest(req, lock);

          hasNextPage = setupNextCrawlContext(currentContext);
        } while (!currentContext.isAborted() && !retryPolicy.abortCrawl() && hasNextPage);

      }
    } catch (FatalCrawlException e) {
      notifyError(e);
      notifyFinishedAbnormally();
      return;
    }

    if (currentContext != null && currentContext.isAborted()) {
      LOG.info("Aborted crawl on request");
      notifyFinishedOnUserRequest();
    } else if (retryPolicy.abortCrawl()) {
      LOG.info("Aborted crawl due to exhausted retries");
      notifyFinishedAbnormally();
    } else {
      notifyFinishedNormally();
    }
  }

  /**
   * Executes a request while honoring retries. If successful it also reports the response.
   *
   * @param req to execute
   * @return either the result if successful or {@code null}
   */
  private RES executeRequest(REQ req, Lock lock) throws FatalCrawlException {
    boolean complete = false;
    boolean tryAgain = true;
    int tryCount = 0;

    RES res = null;
    do {
      try {
        tryCount++;

        // First we need to get the lock, we're waiting 10 minutes between tries
        // TODO: At the moment we never abort this so we may wait forever
        while (!lock.tryLock(10, TimeUnit.MINUTES)) {
          LOG.debug("Failed to acquire lock [{}]", lock);
        }

        // Then we actually try to execute and handle the request
        notifyRequest(req, tryCount);
        stopwatch.reset();
        stopwatch.start();

        res = client.execute(req, responseHandler);
        stopwatch.stop();
        notifyResponse(res,
          tryCount,
          stopwatch.elapsedMillis(),
          responseHandler.getRecordCount(),
          responseHandler.isEndOfRecords());
        retryPolicy.successfulRequest();
        complete = true;
      } catch (TransportException e) {
        notifyError(e);
        if (retryPolicy.allowAfterTransportException()) {
          LOG.info("Got transport exception, will retry", e);
          tryAgain = true;
        } else {
          tryAgain = false;
          LOG.warn("Got transport exception, will give up this request");
          retryPolicy.giveUpRequest();
        }
      } catch (ProtocolException e) {
        notifyError(e);
        if (retryPolicy.allowAfterProtocolException()) {
          LOG.info("Got protocol exception, will retry", e);
          tryAgain = true;
        } else {
          tryAgain = false;
          LOG.warn("Got protocol exception, will give up this request");
          retryPolicy.giveUpRequest();
        }
      } finally {
        // We don't generally stop it here because we don't want to measure even more of "non request" time than we
        // already do. Splitting this code in multiple try/catch/finally blocks seems to heavyweight compared to adding
        // this extra check.
        if (stopwatch.isRunning()) {
          stopwatch.stop();
        }
        // We unlock even if the request was not successful to give other crawlers a chance to get this lock
        lock.unlock();
      }
    } while (!complete && tryAgain);

    return res;
  }

  private void nextPage(CTX context, boolean speculative) {
    context.setSpeculative(speculative);

    // TODO: This isn't very intelligent and could be improved.
    // We can either parse the response in the crawler which would be a major architectural change. We could also read
    // the diagnostics messages the providers provide us with and act on those. Best would be to start a crawl with a
    // very conservative number and then increase the number of requested records dynamically.
    int offset = responseHandler.getRecordCount().or(requestHandler.getLimit());
    context.setOffset(context.getOffset() + offset);
  }

  private void nextRange(CTX context) {
    context.setOffset(0);
    context.setSpeculative(false);
  }

  private void notifyError(String msg) {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.error(msg);
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyError(Throwable e) {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.error(e);
      } catch (Exception innerException) {
        LOG.warn("Listener threw exception", innerException);
      }
    }
  }

  private void notifyFinishedAbnormally() {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.finishCrawlAbnormally();
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyFinishedNormally() {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.finishCrawlNormally();
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyFinishedOnUserRequest() {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.finishCrawlOnUserRequest();
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyProgress(CTX context) {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.progress(context);
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyRequest(REQ req, int tryCount) {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.request(req, tryCount);
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyResponse(
    RES result, int tryCount, long duration, Optional<Integer> recordCount, Optional<Boolean> endOfRecords
    ) {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.response(result, tryCount, duration, recordCount, endOfRecords);
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  private void notifyStart() {
    for (CrawlListener<CTX, REQ, RES> listener : listeners) {
      try {
        listener.startCrawl();
      } catch (Exception e) {
        LOG.warn("Listener threw exception", e);
      }
    }
  }

  /**
   * This method inspects the state following a request and will determine the next course of action.
   * <p/>
   * This includes checks for whether the response was coherent and whether the server indicates if another page should
   * be requested, and also checks such as speculative execution when it is ambiguous, and also if it detects the same
   * response is being returned (safeguarding against infinite loops).
   * <p/>
   * These are all the handled cases:
   * <table border=1>
   * <tbody>
   * <tr>
   * <th>Nr.</th>
   * <th>Valid response?</th>
   * <th>End of Records?</th>
   * <th>Record count</th>
   * <th>Got content?</th>
   * <th>Action</th>
   * </tr>
   * <tr>
   * <td>1</td>
   * <td>yes</td>
   * <td>unknown</td>
   * <td>unknown</td>
   * <td>yes</td>
   * <td>next page</td>
   * </tr>
   * <tr>
   * <td>2</td>
   * <td>yes</td>
   * <td>unknown</td>
   * <td>unknown</td>
   * <td>no</td>
   * <td>next page (speculative)</td>
   * </tr>
   * <tr>
   * <td>3</td>
   * <td>yes</td>
   * <td>unknown</td>
   * <td>known</td>
   * <td>yes</td>
   * <td>next page, log error if record count == 0</td>
   * </tr>
   * <tr>
   * <td>4</td>
   * <td>yes</td>
   * <td>unknown</td>
   * <td>known</td>
   * <td>no</td>
   * <td>next page (speculative), log error if record count != 0</td>
   * </tr>
   * <tr>
   * <td>5</td>
   * <td>yes</td>
   * <td>true</td>
   * <td>unknown</td>
   * <td>yes</td>
   * <td>next range</td>
   * </tr>
   * <tr>
   * <td>6</td>
   * <td>yes</td>
   * <td>true</td>
   * <td>unknown</td>
   * <td>no</td>
   * <td>next range</td>
   * </tr>
   * <tr>
   * <td>7</td>
   * <td>yes</td>
   * <td>true</td>
   * <td>known</td>
   * <td>yes</td>
   * <td>next range, log error if record count == 0</td>
   * </tr>
   * <tr>
   * <td>8</td>
   * <td>yes</td>
   * <td>true</td>
   * <td>known</td>
   * <td>now</td>
   * <td>next range, log error if record count != 0</td>
   * </tr>
   * <tr>
   * <td>9</td>
   * <td>yes</td>
   * <td>false</td>
   * <td>unknown</td>
   * <td>yes</td>
   * <td>next page</td>
   * </tr>
   * <tr>
   * <td>10</td>
   * <td>yes</td>
   * <td>false</td>
   * <td>unknown</td>
   * <td>no</td>
   * <td>next page (speculative)</td>
   * </tr>
   * <tr>
   * <td>11</td>
   * <td>yes</td>
   * <td>false</td>
   * <td>known</td>
   * <td>yes</td>
   * <td>next page, log error if record count == 0</td>
   * </tr>
   * <tr>
   * <td>12</td>
   * <td>yes</td>
   * <td>false</td>
   * <td>known</td>
   * <td>no</td>
   * <td>next page (speculative), log error if record count != 0</td>
   * </tr>
   * <tr>
   * <td>13</td>
   * <td>yes</td>
   * <td>irrelevant</td>
   * <td>irrelevant</td>
   * <td>yes, same content as last request that contained content</td>
   * <td>next range, log error</td>
   * </tr>
   * <tr>
   * <td>14</td>
   * <td>yes</td>
   * <td>irrelevant</td>
   * <td>irrelevant</td>
   * <td>no, after speculative request</td>
   * <td>next range</td>
   * </tr>
   * <tr>
   * <td>15</td>
   * <td>yes</td>
   * <td>irrelevant</td>
   * <td>irrelevant</td>
   * <td>yes, after speculative request</td>
   * <td>according to one of the other rules, but we could log this one</td>
   * </tr>
   * <tr>
   * <td>16</td>
   * <td>no</td>
   * <td>irrelevant</td>
   * <td>irrelevant</td>
   * <td>irrelevant</td>
   * <td>RetryPolicy: Retry, skip, abort crawl</td>
   * </tr>
   * </tbody>
   * </table>
   * <p/>
   * <em>valid request</em> means that we did not get a {@link ProtocolException} or {@link TransportException}.
   * <p/>
   * <p/>
   * <p/>
   *
   * @return whether there is another page for the current crawl context. If not we'll try the next range.
   */
  private boolean setupNextCrawlContext(CTX context) {

    // 16: We did not get a valid reply so we're going to skip to the next range, the main loop outside of this method
    // will determine if we're going to abort because of too many errors.
    if (!responseHandler.isValidState()) {
      nextRange(context);
      return false;
    }

    Optional<Long> currentContentHash = responseHandler.getContentHash();
    // 13: Check to see if we got the exact same content as for the last request, if so skip to the next range
    // if not then proceed as normal
    if (currentContentHash.isPresent()
      && context.getLastContentHash().isPresent()
      && currentContentHash.get().equals(context.getLastContentHash().get())) {
      // Next range, log error
      notifyError("Got same content twice in a row, hash is [" + currentContentHash.get() + "]");
      nextRange(context);
      return false;
    }

    // We did check the old content hash so we can now update it with the new one
    if (currentContentHash.isPresent()) {
      context.setLastContentHash(currentContentHash);
    }

    // 14: Next range (last request was speculative but we didn't get any content)
    if (!currentContentHash.isPresent() && context.isSpeculative()) {
      nextRange(context);
      return false;
    }

    if (currentContentHash.isPresent() && context.isSpeculative()) {
      // 15: Only log previous and last request but otherwise continue with any normal request handling
      // The last request was speculative and we did get content this time, that is in theory valid behavior for
      // all protocols but is definitely not normal so we should log this and maybe take a look at the endpoints
      // in question. The content hash comparison or the normal request handling will be used to eventually
      // skip to the next range.

      // TODO: Log speculative request that lead to this
    }


    Optional<Integer> recordCount = responseHandler.getRecordCount();
    if (responseHandler.isEndOfRecords().or(false)) {

      // 5, 6, 7 & 8
      nextRange(context);

      if (recordCount.isPresent()) {
        if (currentContentHash.isPresent()) {
          if (recordCount.get() == 0) {
            // 7: Next range, log error if record count == 0
            notifyError("Got content but record count says we got nothing");
          }
        } else {
          // 8: Next range, log error if record count != 0
          if (recordCount.get() != 0) {
            notifyError("Did not get content but record count says we got "
              + "[" + recordCount.get() + "]"
              + " records");
          }
        }

      }

      return false;
    } else {

      if (recordCount.isPresent()) {

        if (currentContentHash.isPresent()) {
          // 3 & 11: Next page, log error if record count == 0
          if (recordCount.get() == 0) {
            notifyError("Got content but record count says we got nothing");
          }

          nextPage(context, false);
          return true;
        } else {
          // 4 & 12: Next page (speculative), log error if record count != 0
          if (recordCount.get() != 0) {
            notifyError("Did not get content but record count says we got [" + recordCount.get() + "] records");
          }

          nextPage(context, true);
          return true;
        }

      } else {
        if (currentContentHash.isPresent()) {
          // 1 & 9: Next page
          nextPage(context, false);
          return true;
        } else {
          // 2 & 10: Next page (speculative)
          nextPage(context, true);
          return true;
        }
      }
    }

  }

}
